Netty HashedWheelTimer时间轮源码学习

HashedWheelTimer时间轮源码学习

HashedWheelTimer时间轮的简介

  • HashedWheelTimer是Netty中的一个基础工具类,主要用来高效处理大量定时任务,且任务对时间精度要求相对不高, 比如链接超时管理等场景, 缺点是, 内存占用相对较高。但是在使用时要注意任务里不要有太耗时的操作, 否则会阻塞Worker线程, 导致tick不准
  • HashedWheelTimer主要还是一个DelayQueue和一个时间轮算法组合
  • 如下图,可以看到HashedWheelTimer是由一个环形链表及数组构成
    HashedWheelTimer原理图
  • 如下图,可以解释为什么在使用HashedWheelTimer不能有太耗时的操作,因为worker的执行时,任务是串行的
    HashedWheelTimer执行过程
  • 如下图,可以看到HashedWheelTimer是由HashedWheelBucket数组, HashedWheelTimeout链表和工作线程Worker组成,所以我们的源码分析也主要从这几个类入手
    HashedWheelTimer类图

源码解析

HashedWheelTimer中的基本字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//实例计数器
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
//实例过多警告值
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
//实际数量限制
private static final int INSTANCE_COUNT_LIMIT = 64;
private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
//资源泄漏检测器
private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
.newResourceLeakDetector(HashedWheelTimer.class, 1);
//工作线程状态更新
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
//泄漏值
private final ResourceLeakTracker<HashedWheelTimer> leak;
//工作对象
private final Worker worker = new Worker();
//工作线程
private final Thread workerThread;

public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
private volatile int workerState; // 0 - init, 1 - started, 2 - shut down
//tick的时长,也就是指针多久转一格
private final long tickDuration;
//时间轮数组
private final HashedWheelBucket[] wheel;
// 这是一个标示符,用来快速计算任务应该呆的格子。
private final int mask;
//开始时间已初始化
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
//任务队列
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
//关闭的任务队列
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
//挂起超时次数
private final AtomicLong pendingTimeouts = new AtomicLong(0);
//最大挂起超时次数
private final long maxPendingTimeouts;
//开始时间
private volatile long startTime;
  • 从以上源码中我们可以大概了解到一个时间轮的执行依赖哪些条件,其中我们的任务都是基于Queue来实现的,但是这里我们要注意的是,这里的Queue是基于jctools中的Queue,以此得到更高的性能
  • mask标识符用来做位运算
  • 通过原子类来保证并发情况下的一致性
  • 这里我觉得值得我们学习的地方,是此处引用了资源泄露检测器,当资源超过64的时候就会进行告警,在细节方面netty考虑的非常全面,这个也是我们在平时编码的时需要学习的

HashedWheelTimer构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public HashedWheelTimer(
ThreadFactory threadFactory, // 用来创建worker线程
long tickDuration,// tick的时长,也就是指针多久转一格
TimeUnit unit, // tickDuration的时间单位
int ticksPerWheel, // 一圈有几格
boolean leakDetection, // 是否开启内存泄露检测
long maxPendingTimeouts //最大挂起超时次数
) {

if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}

// 将ticksPerWheel标准化为2的幂并初始化轮子.
wheel = createWheel(ticksPerWheel);
// 这是一个标示符,用来快速计算任务应该呆的格子。
// 我们知道,给定一个deadline的定时任务,其应该呆的格子=deadline%wheel.length.但是%操作是个相对耗时的操作,所以使用一种变通的位运算代替:
// 因为一圈的长度为2的n次方,mask = 2^n-1后低位将全部是1,然后deadline&mast == deadline%wheel.length
// java中的HashMap也是使用这种处理方法
mask = wheel.length - 1;

// 转换成纳秒处理
long duration = unit.toNanos(tickDuration);

// 校验是否存在溢出。即指针转动的时间间隔不能太长而导致tickDuration*wheel.length>Long.MAX_VALUE
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}

if (duration < MILLISECOND_NANOS) {
if (logger.isWarnEnabled()) {
logger.warn("Configured tickDuration %d smaller then %d, using 1ms.",
tickDuration, MILLISECOND_NANOS);
}
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}

// 创建worker线程
workerThread = threadFactory.newThread(worker);

// 这里默认是启动内存泄露检测:当HashedWheelTimer实例超过当前cpu可用核数*4的时候,将发出警告
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

this.maxPendingTimeouts = maxPendingTimeouts;

if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
//发起警告
reportTooManyInstances();
}
}
  • 这里要注意,如果ticksPerWheel的默认值是512
  • HashedWheelTimer其实最终都是转换成纳秒处理的

HashedWheelTimer的createWheel方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
// 初始化ticksPerWheel的值为不小于ticksPerWheel的最小2的n次方
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
// 初始化wheel数组
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
  • 这里要注意,创建时间轮数组的时候,最大长度不能超过2的30次方

HashedWheelTimer的normalizeTicksPerWheel方法

1
2
3
4
5
6
7
8
// 初始化ticksPerWheel的值为不小于ticksPerWheel的最小2的n次方
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = 1;
while (normalizedTicksPerWheel < ticksPerWheel) {
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
}
  • 这里通过位运算来初始化每个轮盘的刻度
  • 但这里有个问题,如果轮盘大小指定过大,这里的循环次数也会更多,性能会存在问题,此处可以进行优化[jdk1.8 hashmap的hash算法,后面深入了解下]

HashedWheelTimer的start方法(时间轮启动的方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 启动时间轮。这个方法其实不需要显示的主动调用,因为在添加定时任务(newTimeout()方法)的时候会自动调用此方法。
// 这个是合理的设计,因为如果时间轮里根本没有定时任务,启动时间轮也是空耗资源
public void start() {
// 判断当前时间轮的状态,如果是初始化,则启动worker线程,启动整个时间轮;如果已经启动则略过;如果是已经停止,则报错
// 这里是一个Lock Free的设计。因为可能有多个线程调用启动方法,这里使用AtomicIntegerFieldUpdater原子的更新时间轮的状态
switch (WORKER_STATE_UPDATER.get(this)) {
//如果时间轮还没有启动,则更改状态并启动
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
//如果当前时间轮已经启动,则跳出该逻辑
case WORKER_STATE_STARTED:
break;
//如果是关闭状态,抛出无法启动异常
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
//如果工作状态未指定,则表示该程序异常,直接error
default:
throw new Error("Invalid WorkerState");
}

// 等待worker线程初始化时间轮的启动时间
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}

HashedWheelTimer的stop方法(时间轮停止的方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public Set<Timeout> stop() {
// worker线程不能停止时间轮,也就是加入的定时任务,不能调用这个方法。
// 不然会有恶意的定时任务调用这个方法而造成大量定时任务失效
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
// 尝试CAS替换当前状态为“停止:2”。如果失败,则当前时间轮的状态只能是“初始化:0”或者“停止:2”。直接将当前状态设置为“停止:2“
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}

return Collections.emptySet();
}

try {
boolean interrupted = false;
//如果工作线程存活
while (workerThread.isAlive()) {
//中断工作线程
//interrupt()不能中断在运行中的线程,它只能改变中断状态而已。
workerThread.interrupt();
try {
//工作线程加入本地线程
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
//如果发现线程已经被打上中断标识
if (interrupted) {
//改变当前线程状态
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
return worker.unprocessedTimeouts();
}

HashedWheelTimer的newTimeout方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 参数校验
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}

long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
//待处理超时数 pendingTimeoutsCount 大于或等于允许的最大挂起
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 如果时间轮没有启动,则启动
start();

//将超时添加到超时队列,该队列将在下一个时钟处理。
//在处理过程中,所有排队的HashedWheelTimeouts都将添加到正确的HashedWheelBucket中。
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

// 防止溢出。
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 这里定时任务不是直接加到对应的格子中,而是先加入到一个队列里,然后等到下一个tick的时候,会从队列里取出最多100000个任务加入到指定的格子中
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}

Worker类

Worker是时间轮的核心线程类。tick的转动,过期任务的处理都是在这个线程中处理的。我们可以看到Worker实现Runnable接口,也就意味着我们的时间轮中是由worker来创建线程并执行任务

1
2
3
4
5
6
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

private long tick;
//... 省略方法
}

Worker类中的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override
public void run() {
// 初始化startTime.只有所有任务的的deadline都是想对于这个时间点
startTime = System.nanoTime();
if (startTime == 0) {
// 由于System.nanoTime()可能返回0,甚至负数。并且0是一个标示符,用来判断startTime是否被初始化,所以当startTime=0的时候,重新赋值为1
startTime = 1;
}

// 唤醒阻塞在start()的线程
startTimeInitialized.countDown();
// 只要时间轮的状态为WORKER_STATE_STARTED,就循环的“转动”tick,循环判断响应格子中的到期任务
do {
// waitForNextTick方法主要是计算下次tick的时间, 然后sleep到下次tick
// 返回值就是System.nanoTime() - startTime, 也就是Timer启动后到这次tick, 所过去的时间
final long deadline = waitForNextTick();
if (deadline > 0) { // 可能溢出或者被中断的时候会返回负数, 所以小于等于0不管
// 获取tick对应的格子索引
int idx = (int) (tick & mask);
// 移除被取消的任务
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
// 从任务队列中取出任务加入到对应的格子中
transferTimeoutsToBuckets();
System.out.println("bucket"+bucket+",idx"+idx);
// 过期执行格子中的任务
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

// 这里应该是时间轮停止了,清除所有格子中的任务,并加入到未处理任务列表,以供stop()方法返回
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
// 将还没有加入到格子中的待处理定时任务队列中的任务取出,如果是未取消的任务,则加入到未处理任务队列中,以供stop()方法返回
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// 处理取消的任务
processCancelledTasks();
}

Worker类中的transferTimeoutsToBuckets方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 将newTimeout()方法中加入到待处理定时任务队列中的任务加入到指定的格子中
private void transferTimeoutsToBuckets() {
// 每次tick只处理10w个任务,以免阻塞worker线程
// adds new timeouts in a loop.
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
//System.out.println("当前times.size"+timeouts.size());
// 如果没有任务了,直接跳出循环
if (timeout == null) {
// all processed
break;
}
// 还没有放入到格子中就取消了,直接略过
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
// 计算任务需要经过多少个tick
long calculated = timeout.deadline / tickDuration;
// 计算任务的轮数
timeout.remainingRounds = (calculated - tick) / wheel.length;

//如果任务在timeouts队列里面放久了, 以至于已经过了执行时间, 这个时候就使用当前tick, 也就是放到当前bucket, 此方法调用完后就会被执行.
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
System.out.println("tick:"+ticks);
int stopIndex = (int) (ticks & mask);
// 将任务加入到响应的格子中
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}b
文章作者: 怀风
文章链接: http://blog.leishunyu.com/2019/03/27/2019-03-27-Netty -HashedWheelTimer时间轮学习/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Maple